AWS IoTのジョブ機能とモノの動的グループを利用してデバイスに配布するソフトウェアを管理する
はじめに
サーバーレス開発部@大阪の岩田です。
AWS IoTのジョブ機能とモノの動的グループを利用してデバイスに配布するソフトウェアを管理する手法について調べてみました。
やること
ざっくり下記のような環境を作ります。
モノの動的グループを利用して、導入済みソフトウェアのバージョンがXX以下のデバイスに対してソフトウェアアップデートのジョブを作成。 AWS IoT側ではジョブの正常終了検知後にLambdaを起動して対象デバイスの管理情報を更新することで常にデバイスの管理情報を最新に保ち、次回のジョブ作成に備えます。
事前準備
事前にAWS IoTの設定をいくつか更新しておきます。
フリートインデックス作成の設定
モノの動的グループにはフリートインデックスが必要です。 今回はモノの属性にソフトウェアのバージョンを保持して管理する構成を取るので、「グループ名、説明、、、、」をオンにします。
イベントベースのメッセージ
ジョブの成功を検知してLambdaを起動するために、「ジョブの実行、成功、失敗、、、、」を有効にします。
テスト用のモノや証明書の準備
テストに使うモノや証明書、S3バケットなどを作成しておきます。詳細な手順は割愛します。
モノの属性についてですが、動的グループを作る際に利用するのでapp_version
という属性を追加して1.0を設定してください。
やってみる
ここから実際に試していきます。
最新版ソフトウェアのアップロード
まずデバイスに配布する最新版のソフトウェアを静的Webサイトホスティングを有効化したS3バケットにアップロードしておきます。今回は擬似的に下記のシェルスクリプトを最新版ソフトウェアとして扱います。
#!/bin/bash echo "ver 2.0"
ジョブドキュメントの作成
次にジョブドキュメントを作成して適当なS3バケットにアップします。
{ "app_url": "https://s3-ap-northeast-1.amazonaws.com/<ソフトウェアをアップしたS3バケット名>/app.sh", "app_version": "2.0" }
app_url
には最新版のソフトウェアをDLするためのURLを、app_version
には対象ソフトウェアのバージョンを設定します。
モノの動的グループを作成
バージョン2.0以下のソフトウェアを実行しているデバイスを抽出するためにモノの動的グループを作成します。
aws iot create-dynamic-thing-group --thing-group-name appver_lt_20 --query-string 'attributes.app_version < 2.0'
クエリにattributes.app_version < 2.0
を指定して対象のデバイスを抽出します。
作成が完了するとテスト用のモノがグループ内に表示されているのが確認できます。
モノの動的グループをターゲットにしたジョブの作成
作成したモノの動的グループをターゲットにしたジョブを作成します
aws iot create-job --job-id 1 --target-selection SNAPSHOT --document-source https://s3-ap-northeast-1.amazonaws.com/<ジョブドキュメントをアップしたS3バケット>/job.json --targets arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:thinggroup/appver_lt_20
デバイス側のプログラムを準備
デバイス側でジョブを処理しAWS IoTに完了通知を行うためのプログラムを用意します。
今回はAWS IoT Device SDK for PythonのサンプルプログラムjobsSample.py
を加工してデバイス側の処理をシミュレーションします。
executeJobを修正
ジョブドキュメントから取得したURLを使って最新版ソフトウェアをDLする処理を追加します。
def executeJob(self, execution): print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber'])) print('With jobDocument: ' + json.dumps(execution['jobDocument'])) app_url = execution['jobDocument']['app_url'] self.app_version = execution['jobDocument']['app_version'] urllib.request.urlretrieve(app_url, 'app.sh')
startNextJobSuccessfullyInProgressを修正
デバイス側の処理が正常終了した後、適用済みのソフトウェアバージョンをAWS IoTに通知するように処理を追加します。
...略 statusDetails = { 'HandledBy': 'ClientToken: {}'.format(self.clientToken), 'appVersion': str(self.app_version) }
この作りだと、デバイス側が申告してきたソフトウェアバージョンを鵜呑みにすることになるので、本来はLambdaがジョブIDをキーに対象のジョブドキュメントをパースして適用済みのソフトウェアバージョンを判断する方が良いでしょう。
jobsSample.pyの最終形です。
''' /* * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ ''' from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus import threading import logging import time import datetime import argparse import json import urllib.request class JobsMessageProcessor(object): def __init__(self, awsIoTMQTTThingJobsClient, clientToken): #keep track of this to correlate request/responses self.clientToken = clientToken self.awsIoTMQTTThingJobsClient = awsIoTMQTTThingJobsClient self.done = False self.jobsStarted = 0 self.jobsSucceeded = 0 self.jobsRejected = 0 self.app_version = '' self._setupCallbacks(self.awsIoTMQTTThingJobsClient) def _setupCallbacks(self, awsIoTMQTTThingJobsClient): self.awsIoTMQTTThingJobsClient.createJobSubscription(self.newJobReceived, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextJobSuccessfullyInProgress, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE) self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextRejected, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE) # '+' indicates a wildcard for jobId in the following subscriptions self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobSuccessful, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+') self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobRejected, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+') #call back on successful job updates def startNextJobSuccessfullyInProgress(self, client, userdata, message): payload = json.loads(message.payload.decode('utf-8')) if 'execution' in payload: self.jobsStarted += 1 execution = payload['execution'] self.executeJob(execution) statusDetails = { 'HandledBy': 'ClientToken: {}'.format(self.clientToken), 'appVersion': str(self.app_version) } threading.Thread( target = self.awsIoTMQTTThingJobsClient.sendJobsUpdate, kwargs = { 'jobId': execution['jobId'], 'status': jobExecutionStatus.JOB_EXECUTION_SUCCEEDED, 'statusDetails': statusDetails, 'expectedVersion': execution['versionNumber'], 'executionNumber': execution['executionNumber']} ).start() else: print('Start next saw no execution: ' + message.payload.decode('utf-8')) self.done = True def executeJob(self, execution): print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber'])) print('With jobDocument: ' + json.dumps(execution['jobDocument'])) app_url = execution['jobDocument']['app_url'] self.app_version = execution['jobDocument']['app_version'] urllib.request.urlretrieve(app_url, 'app.sh') def newJobReceived(self, client, userdata, message): payload = json.loads(message.payload.decode('utf-8')) if 'execution' in payload: self._attemptStartNextJob() else: print('Notify next saw no execution') self.done = True def processJobs(self): self.done = False self._attemptStartNextJob() def startNextRejected(self, client, userdata, message): printf('Start next rejected:' + message.payload.decode('utf-8')) self.jobsRejected += 1 def updateJobSuccessful(self, client, userdata, message): self.jobsSucceeded += 1 def updateJobRejected(self, client, userdata, message): self.jobsRejected += 1 def _attemptStartNextJob(self): statusDetails = {'StartedBy': 'ClientToken: {} on {}'.format(self.clientToken, datetime.datetime.now().isoformat())} threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsStartNext, kwargs = {'statusDetails': statusDetails}).start() def isDone(self): return self.done def getStats(self): stats = {} stats['jobsStarted'] = self.jobsStarted stats['jobsSucceeded'] = self.jobsSucceeded stats['jobsRejected'] = self.jobsRejected return stats # Read in command-line parameters parser = argparse.ArgumentParser() parser.add_argument("-n", "--thingName", action="store", dest="thingName", help="Your AWS IoT ThingName to process jobs for") parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint") parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicJobsSampleClient", help="Targeted client id") args = parser.parse_args() host = args.host rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath port = args.port useWebsocket = args.useWebsocket clientId = args.clientId thingName = args.thingName if args.useWebsocket and args.certificatePath and args.privateKeyPath: parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.") exit(2) if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath): parser.error("Missing credentials for authentication.") exit(2) # Port defaults if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 port = 443 if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 port = 8883 # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # Init AWSIoTMQTTClient myAWSIoTMQTTClient = None if useWebsocket: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTClient connection configuration myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec myAWSIoTMQTTClient.configureMQTTOperationTimeout(10) # 5 sec jobsClient = AWSIoTMQTTThingJobsClient(clientId, thingName, QoS=1, awsIoTMQTTClient=myAWSIoTMQTTClient) print('Connecting to MQTT server and setting up callbacks...') jobsClient.connect() jobsMsgProc = JobsMessageProcessor(jobsClient, clientId) print('Starting to process jobs...') jobsMsgProc.processJobs() while not jobsMsgProc.isDone(): time.sleep(2) print('Done processing jobs') print('Stats: ' + json.dumps(jobsMsgProc.getStats())) jobsClient.disconnect()
Lambdaの作成
ジョブの実行完了をトリガーに起動するLambdaを準備します。 Lambda実行ロールにはAWS IoT周りの権限が付与されていることとします。
import boto3 iot_client = boto3.client('iot') def handler(event, context): if event['status'] != 'SUCCEEDED': return thing_arn = event['thingArn'] thing_name = thing_arn.split(':')[5].split('/')[1] iot_client.update_thing( thingName=thing_name, attributePayload={ 'attributes': { 'app_version': event['statusDetails']['appVersion'] } })
ルールの作成
ジョブの完了通知をトリガーにLambdaを起動するためにAWS IoTのルールを作成します。
ジョブの完了通知は予約済みのMQTTトピック$aws/events/jobExecution/jobID/succeeded
に流れてくるので、ルールクエリステートメントにはSELECT topic(4) as job_id, * FROM '$aws/events/jobExecution/+/succeeded'
を指定し、アクションには先ほど作成したLambdaを指定します。
デバイス側でジョブを実行
準備が整ったので先ほど用意したプログラムを実行してみます。
python jobsSample.py -e <AWS IoTのエンドポイント> -r <ルートCA証明書> -k <対象デバイスの秘密鍵> -c <対象デバイスの証明書> -n <モノの名前> -id <クライアントID モノの名前と揃える>
実行完了後にapp.sh
がDLされているので実行してみます。
$ sh app.sh ver2.0
ちゃんと最新版のソフトウェアが取得できています。
マネジメントコンソール上でもジョブが成功していることを確認してみましょう。
OKそうです。
対象となったモノの設定を確認するとapp_version
が2.0に更新されていることが分かります。
対象のモノがモノの動的グループから外れていることが分かります。
確認のため先ほどと同じ定義でジョブID:2のジョブを作成してみます。
aws iot create-job --job-id 2 --target-selection SNAPSHOT --document-source https://s3-ap-northeast-1.amazonaws.com/<ジョブドキュメントをアップしたS3バケット>/job.json --targets arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:thinggroup/appver_lt_20
マネジメントコンソールからジョブの進捗状況を確認します。
モノの動的グループに所属する=app_version
が2.0以下のデバイスが存在しないため、ALL:0となっています。
まとめ
AWS IoTのジョブ機能とモノの動的グループを利用して、デバイスに配布するソフトウェアを管理する手法についてご紹介しました。 今回はモノの動的グループを作成するためにモノの属性を利用しましたが、クエリの条件にはデバイスシャドウを利用することも可能です。
デバイスシャドウはモノの属性では扱えないJSON形式が扱えるため、たとえば
{ "apps": [ { "name": "app1", "version": 1.0 }, { "name": "app2", "version": 1.1 }, { "name": "app3", "version": 2.1 } ] }
といった情報を保持させて複数アプリのバージョンを管理するような使い方も出来そうです。このあたりのユースケースについて今後もさらに深掘りしていきたいと思います。
誰かの参考になれば幸いです。